package com.stars.distributed.schedule.mq.rocketmq.apache;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.stars.distributed.schedule.client.DistributedTaskExecutionResult;
import com.stars.distributed.schedule.client.DistributedTaskExecutionResultWrapper;
import com.stars.distributed.schedule.exception.DistributedScheduleMQException;
import com.stars.distributed.schedule.mq.AbstractMQListener;
import com.stars.distributed.schedule.util.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * RocketMQ监听类
 *
 * @author guoguifang
 */
public class RocketMQListener extends AbstractMQListener {

    private DefaultMQPushConsumer consumer;
    private String nameServer;
    private String groupName;
    private String topic;
    private volatile boolean running;

    @Override
    public synchronized void start() throws DistributedScheduleMQException {
        if (this.running) {
            if (logger.isDebugEnabled()) {
                logger.error("The RocketMQ listener({}) is already running!", topic);
            }
            return;
        }

        this.consumer = new DefaultMQPushConsumer(groupName + topic);
        this.consumer.setNamesrvAddr(nameServer);
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            this.consumer.subscribe(topic, "*");
        } catch (MQClientException e) {
            throw new DistributedScheduleMQException("The RocketMQ listener(" + topic + ") subscribe failure!", e);
        }
        this.consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                logger.info("The RocketMQ listener({}) receive message [{}].", topic, messageExtList);
                for (MessageExt messageExt : messageExtList) {
                    String text;
                    try {
                        text = new String(messageExt.getBody(), "utf-8");
                    } catch (UnsupportedEncodingException e) {
                        logger.error("The RocketMQ listener(" + topic + ") receive message '" + messageExtList + "' conversion failure!", e);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    if (StringUtils.isBlank(text)) {
                        logger.error("The RocketMQ listener(" + topic + ") receive message '" + messageExt + "' body is blank!");
                        continue;
                    }
                    DistributedTaskExecutionResult distributedTaskExecutionResult;
                    try {
                        distributedTaskExecutionResult = JSON.parseObject(text, new TypeReference<DistributedTaskExecutionResult>() {}, Feature.SupportNonPublicField);
                    } catch (Exception e) {
                        logger.error("The RocketMQ listener(" + topic + ") receive message [" + text + "] convert class 'DistributedTaskExecutionResult' failure!", e);
                        continue;
                    }
                    if (distributedTaskExecutionResult == null) {
                        logger.error("The RocketMQ listener(" + topic + ") receive message [" + text + "] is invalid!");
                        continue;
                    }
                    DistributedTaskExecutionResultWrapper distributedTaskExecutionResultWrapper = new DistributedTaskExecutionResultWrapper(distributedTaskExecutionResult);
                    if (distributedTaskExecutionResultWrapper.getId() == null || distributedTaskExecutionResultWrapper.getVersion() == null) {
                        logger.error("The RocketMQ listener(" + topic + ") receive message [" + distributedTaskExecutionResult + "] id or version is null!");
                        continue;
                    }
                    updateSubTaskStatus(distributedTaskExecutionResultWrapper);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        try {
            this.consumer.start();
            this.running = true;
            logger.info("The RocketMQ listener({}) start up success!", topic);
        } catch (MQClientException e) {
            throw new DistributedScheduleMQException("The RocketMQ listener(" + topic + ") start up failure!", e);
        }
    }

    @Override
    public synchronized void shutdown() {
        if (!this.running) {
            if (logger.isDebugEnabled()) {
                logger.debug("The RocketMQ listener({}) is not started!", topic);
            }
            return;
        }
        try {
            this.consumer.shutdown();
            logger.info("The RocketMQ listener({}) shutdown success!", topic);
        } catch (Exception e) {
            logger.error("The RocketMQ listener(" + topic + ") shutdown failure!", e);
        }
        this.consumer = null;
        this.running = false;
    }

    @Override
    public synchronized boolean isRunning() {
        return this.running;
    }

    public RocketMQListener(String nameServer, String groupName, String topic) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topic = topic;
    }
}
